Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical;

import static java.util.Objects.requireNonNull;

public class StreamPath
{
private String host;
private int port;
public boolean valid = false;

public StreamPath(String path)
{
requireNonNull(path);
if (path.contains("://"))
{
path = path.substring(path.indexOf("://") + 3);
}
int colon = path.indexOf(':');
if (colon > 0)
{
host = path.substring(0, colon);
port = Integer.parseInt(path.substring(colon + 1));
this.valid = true;
}
}

public String getHostName()
{
return host;
}

public int getPort()
{
return port;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class Constants
public static final int S3_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int REDIS_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int GCS_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int STREAM_BUFFER_SIZE = 8 * 1024 * 1024;

public static final int MIN_REPEAT = 3;
public static final int MAX_SCOPE = 512;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@
public final class HttpServer
{
final HttpServerInitializer initializer;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;

public HttpServer(HttpServerHandler handler) throws CertificateException, SSLException
{
this.initializer = new HttpServerInitializer(HttpServerUtil.buildSslContext(), handler);
handler.setServerCloser(this::close);
}

public void serve(int PORT) throws InterruptedException
{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try
{
ServerBootstrap b = new ServerBootstrap();
Expand All @@ -59,13 +63,28 @@ public void serve(int PORT) throws InterruptedException
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(this.initializer);

Channel ch = b.bind(PORT).sync().channel();

ch.closeFuture().sync();
channel = b.bind(PORT).sync().channel();
channel.closeFuture().sync();
} finally
{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public void close()
{
if (channel != null)
{
channel.close();
}
if (bossGroup != null)
{
bossGroup.shutdownGracefully();
}
if (workerGroup != null)
{
workerGroup.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@ChannelHandler.Sharable
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject>
{
protected Runnable serverCloser;

@Override
public void channelReadComplete(ChannelHandlerContext ctx)
Expand Down Expand Up @@ -99,5 +100,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

ChannelFuture f = ctx.writeAndFlush(response);
f.addListener(ChannelFutureListener.CLOSE);
ctx.close();
}

public void setServerCloser(Runnable serverCloser) {
this.serverCloser = serverCloser;
}
}
86 changes: 86 additions & 0 deletions pixels-storage/pixels-storage-stream/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels</artifactId>
<version>0.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>pixels-storage-stream</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels-common</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>

<!-- asynchttpclient -->
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven.plugin.deploy.version}</version>
<configuration>
<altDeploymentRepository>
local.mvn.repo::default::file://${project.parent.basedir}/mvn
</altDeploymentRepository>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven.plugin.source.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2024 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.storage.stream;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Storage;

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static java.util.Objects.requireNonNull;

public class PhysicalStreamReader implements PhysicalReader
{
private final Storage stream;
private final String path;
private final DataInputStream dataInputStream;

public PhysicalStreamReader(Storage storage, String path) throws IOException
{
if (storage instanceof Stream)
{
this.stream = (Stream) storage;
}
else
{
throw new IOException("Storage is not LocalFS.");
}
this.path = path;
this.dataInputStream = storage.open(path);
}

@Override
public long getFileLength() throws IOException
{
throw new UnsupportedOperationException("Can't get file length in PhysicalStreamReader");
}

@Override
public void seek(long desired) throws IOException
{
throw new UnsupportedOperationException("Can't seek in PhysicalStreamReader");
}

@Override
public ByteBuffer readFully(int length) throws IOException
{
byte[] buffer = new byte[length];
dataInputStream.readFully(buffer);
return ByteBuffer.wrap(buffer);
}

@Override
public void readFully(byte[] buffer) throws IOException
{
dataInputStream.readFully(buffer);
}

@Override
public void readFully(byte[] buffer, int offset, int length) throws IOException
{
dataInputStream.readFully(buffer, offset, length);
}

@Override
public long readLong(ByteOrder byteOrder) throws IOException
{
if (requireNonNull(byteOrder).equals(ByteOrder.BIG_ENDIAN))
{
return dataInputStream.readLong();
}
else
{
return Long.reverseBytes(dataInputStream.readLong());
}
}

@Override
public int readInt(ByteOrder byteOrder) throws IOException
{
if (requireNonNull(byteOrder).equals(ByteOrder.BIG_ENDIAN))
{
return dataInputStream.readInt();
}
else
{
return Integer.reverseBytes(dataInputStream.readInt());
}
}

@Override
public boolean supportsAsync() { return false; }

@Override
public CompletableFuture<ByteBuffer> readAsync(long offset, int len) throws IOException
{
throw new UnsupportedOperationException("Can't read async in PhysicalStreamReader");
}

@Override
public void close() throws IOException
{
this.dataInputStream.close();
}

@Override
public String getPath() { return path; }

/**
* Get the port in path.
*
* @return
*/
@Override
public String getName()
{
if (path == null)
{
return null;
}
int slash = path.lastIndexOf(":");
return path.substring(slash + 1);
}

@Override
public long getBlockId() throws IOException
{
throw new IOException("Can't get block id in PhysicalStreamReader");
}

@Override
public Storage.Scheme getStorageScheme() { return stream.getScheme(); }

@Override
public int getNumReadRequests() { return 0; }
}
Loading